import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../common/test_page.dart';
import '../../utils.dart';

Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    Future<void>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

Stream<int> getTimeStream(int n) async* {
  var k = 1;

  yield 0;

  while (k < n) {
    yield await Future<void>.delayed(const Duration(milliseconds: 100))
        .then((_) => k++);
  }
}

class WindowTestPage extends TestPage {
  WindowTestPage(super.title) {
    test('Rx.windowCount.noStartBufferEvery', () async {
      expect(
          Rx.range(1, 4).windowCount(2).asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.noStartBufferEvery.includesEventOnClose', () async {
      expect(
          Rx.range(1, 5).windowCount(2).asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.startBufferEvery.count2startBufferEvery1', () async {
      expect(
          Rx.range(1, 4).windowCount(2, 1).asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.startBufferEvery.count3startBufferEvery2', () async {
      expect(
          Rx.range(1, 8).windowCount(3, 2).asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.startBufferEvery.count3startBufferEvery4', () async {
      expect(
          Rx.range(1, 8).windowCount(3, 4).asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.reusable', () async {
      final transformer = WindowCountStreamTransformer<int>(2);

      expect(
          Stream.fromIterable(const [1, 2, 3, 4])
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );

      expect(
          Stream.fromIterable(const [1, 2, 3, 4])
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowCount.asBroadcastStream', () async {
      final future = Stream.fromIterable(const [1, 2, 3, 4])
          .asBroadcastStream()
          .windowCount(2)
          .drain<void>();

      // listen twice on same stream
      expect(future);
      expect(future);
    });

    test('Rx.windowCount.error.shouldThrowA', () async {
      expect(
        Stream<void>.error(Exception()).windowCount(2),
      );
    });

    test(
      'Rx.windowCount.shouldThrow.invalidCount.negative',
          () {
        expect(() => Stream.fromIterable(const [1, 2, 3, 4]).windowCount(-1));
      },
    );

    test('Rx.windowCount.startBufferEvery.shouldThrow.invalidStartBufferEvery',
            () {
          expect(() => Stream.fromIterable(const [1, 2, 3, 4]).windowCount(2, -1));
        });

    test('Rx.windowCount.nullable', () {
      nullableTest<Stream<String?>>(
            (s) => s.windowCount(2),
      );
    });

    test('Rx.window', () async {
      expect(
          getStream(4)
              .window(Stream<void>.periodic(const Duration(milliseconds: 160))
              .take(3))
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.window.sampleBeforeEvent.shouldEmit', () async {
      expect(
          Stream.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200))
                  .then((_) => 'end'))
              .startWith('start')
              .window(Stream<void>.periodic(const Duration(milliseconds: 40))
              .take(10))
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.window.shouldClose', () async {
      final controller = StreamController<int>()
        ..add(0)
        ..add(1)
        ..add(2)
        ..add(3);

      scheduleMicrotask(controller.close);

      expect(
          controller.stream
              .window(Stream<void>.periodic(const Duration(seconds: 3)))
              .asyncMap((stream) => stream.toList())
              .take(1),
      );
    });

    test('Rx.window.reusable', () async {
      final transformer = WindowStreamTransformer<int>((_) =>
          Stream<void>.periodic(const Duration(milliseconds: 160))
              .take(3)
              .asBroadcastStream());

      expect(
          getStream(4)
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );

      expect(
          getStream(4)
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.window.asBroadcastStream', () async {
      final future = getStream(4)
          .asBroadcastStream()
          .window(Stream<void>.periodic(const Duration(milliseconds: 160))
          .take(10)
          .asBroadcastStream())
          .drain<void>();

      // listen twice on same stream
      expect(future);
      expect(future);
    });

    test('Rx.window.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception())
              .window(Stream<void>.periodic(const Duration(milliseconds: 160))),
      );
    });

    test('Rx.window.nullable', () {
      nullableTest<Stream<String?>>(
            (s) => s.window(Stream<void>.empty()),
      );
    });

    test('Rx.windowTest', () async {
      expect(
          Rx.range(1, 4)
              .windowTest((i) => i % 2 == 0)
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowTest.reusable', () async {
      final transformer = WindowTestStreamTransformer<int>((i) => i % 2 == 0);

      expect(
          Stream.fromIterable(const [1, 2, 3, 4])
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );

      expect(
          Stream.fromIterable(const [1, 2, 3, 4])
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowTest.asBroadcastStream', () async {
      final future = Stream.fromIterable(const [1, 2, 3, 4])
          .asBroadcastStream()
          .windowTest((i) => i % 2 == 0)
          .drain<void>();

      // listen twice on same stream
      expect(future);
      expect(future);
    });

    test('Rx.windowTest.error.shouldThrowA', () async {
      expect(
          Stream<int>.error(Exception()).windowTest((i) => i % 2 == 0));
    });

    test('Rx.windowTest.nullable', () {
      nullableTest<Stream<String?>>(
            (s) => s.windowTest((_) => true),
      );
    });

    test('Rx.windowTime', () async {
      expect(
          getTimeStream(4)
              .windowTime(const Duration(milliseconds: 160))
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowTime.shouldClose', () async {
      final controller = StreamController<int>()
        ..add(0)
        ..add(1)
        ..add(2)
        ..add(3);

      scheduleMicrotask(controller.close);

      expect(
          controller.stream
              .windowTime(const Duration(seconds: 3))
              .asyncMap((stream) => stream.toList())
              .take(1),
      );
    });

    test('Rx.windowTime.reusable', () async {
      final transformer = WindowStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 160)));

      expect(
          getTimeStream(4)
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );

      expect(
          getTimeStream(4)
              .transform(transformer)
              .asyncMap((stream) => stream.toList()),
      );
    });

    test('Rx.windowTime.asBroadcastStream', () async {
      final future = getTimeStream(4)
          .asBroadcastStream()
          .windowTime(const Duration(milliseconds: 160))
          .drain<void>();

      // listen twice on same stream
      expect(future);
      expect(future);
    });

    test('Rx.windowTime.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception())
              .windowTime(const Duration(milliseconds: 160)));
    });

    test('Rx.windowTime.nullable', () {
      nullableTest<Stream<String?>>(
            (s) => s.windowTime(Duration.zero),
      );
    });

  }

}